{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-how-to-use-modulestep.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "\n",
        "# How to create Module, ModuleVersion, and use them in a pipeline with ModuleStep.\n",
        "In this notebook, we introduce the concept of versioned modules and how to use them in an Azure Machine Learning Pipeline.\n",
        "\n",
        "The core idea behind introducing Module, ModuleVersion and ModuleStep is to allow the separation between reusable executable components and their actual usage. These reusable software components (such as scripts or executables) can be used in different scenarios and by different users. This follows the same idea of separating software frameworks/libraries and their actual usage in applications. Module and ModuleVersion take the role of the reusable executable components where ModuleStep is there to link them to an actual usage.\n",
        "\n",
        "A module is an elaborated container of its versions, where each version is the actual computational unit. It is up to users to define the semantics of this hierarchical structure of container and versions. For example, there could be different versions for different use cases, development progress, etc.\n",
        "\n",
        "Each ModuleVersion may have inputs, outputs and rely on parameters and its environment configuration to operate.\n",
        "\n",
        "Because Modules can now be separated from execution in a pipeline, there's a need for a mechanism to reconnect these again, and allow using Modules and their versions in a Pipeline. This is done using a new kind of Step called ModuleStep, which allows embedding a Module (and more precisely, a version of it) in a Pipeline.\n",
        " \n",
        "This notebook shows the usage of a module that computes the sum and product of two numbers. As a module can only be used as a step in a pipeline, we define two different versions for it, to be used in two different use cases:\n",
        "\n",
        "1) As the module powering the initial step of a pipeline, where the step does not receive any input from preceding steps.\n",
        "\n",
        "2) As a module powering a step in the pipeline that receives inputs from preceding steps.\n",
        "\n",
        "Once these two versions are defined, we show how to embed them as steps in the pipeline."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prerequisites and AML Basics\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.\n",
        "\n",
        "### Initialization Steps"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Workspace, Experiment, Datastore, RunConfiguration\n",
        "from azureml.core.compute import AmlCompute\n",
        "from azureml.core.compute import ComputeTarget\n",
        "from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter\n",
        "from azureml.pipeline.core.graph import InputPortDef, OutputPortDef\n",
        "from azureml.pipeline.core.module import Module\n",
        "from azureml.pipeline.steps import ModuleStep\n",
        "\n",
        "workspace = Workspace.from_config()\n",
        "print(workspace.name, workspace.resource_group, workspace.location, workspace.subscription_id, sep = '\\n')\n",
        "\n",
        "aml_compute_target = \"cpu-cluster\"\n",
        "try:\n",
        "    aml_compute = AmlCompute(workspace, aml_compute_target)\n",
        "    print(\"Found existing compute target: {}\".format(aml_compute_target))\n",
        "except:\n",
        "    print(\"Creating new compute target: {}\".format(aml_compute_target))\n",
        "    \n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
        "                                                                min_nodes = 1, \n",
        "                                                                max_nodes = 4)    \n",
        "    aml_compute = ComputeTarget.create(workspace, aml_compute_target, provisioning_config)\n",
        "    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
        "\n",
        "datastore = Datastore(workspace=workspace, name=\"workspaceblobstore\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create a Module"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "A Module is a container that manages computational units. Each such computational unit is a version of the module, and is called a ModuleVersion. We start by either creating a module or fetching an existing one by its ID or by its name."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "module = Module.create(workspace, name=\"AddAndMultiply\", description=\"A module that adds and multiplies\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Calculation entry ModuleVersion"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "A ModuleVersion is an actual computational unit. Defining it involves defining its inputs, outputs, the computation and other configuration items. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Here we define that this version is to be used at the beginning of the pipeline, hence does not have incoming ports, only outgoing."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "module-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "out_sum = OutputPortDef(name=\"out_sum\", default_datastore_name=datastore.name, default_datastore_mode=\"mount\", \n",
        "                        label=\"Sum of two numbers\")\n",
        "out_prod = OutputPortDef(name=\"out_prod\", default_datastore_name=datastore.name, default_datastore_mode=\"mount\", \n",
        "                         label=\"Product of two numbers\")\n",
        "entry_version = module.publish_python_script(\"calculate.py\", \"initial\", \n",
        "                                             inputs=[], outputs=[out_sum, out_prod], params = {\"initialNum\":12},\n",
        "                                             version=\"1\", source_directory=\"./calc\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Calculation middle/end ModuleVersion"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Another version of the module performs a computation in the middle or at the end of the pipeline. This version has both outputs and inputs, as it is to be either followed by another computation, or emits its outputs."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "moduleversion-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "in1_mid = InputPortDef(name=\"in1\", default_datastore_mode=\"mount\", \n",
        "                   default_data_reference_name=datastore.name, label=\"First input number\")\n",
        "in2_mid = InputPortDef(name=\"in2\", default_datastore_mode=\"mount\", \n",
        "                   default_data_reference_name=datastore.name, label=\"Second input number\")\n",
        "out_sum_mid = OutputPortDef(name=\"out_sum\", default_datastore_name=datastore.name, default_datastore_mode=\"mount\",\n",
        "                            label=\"Sum of two numbers\")\n",
        "out_prod_mid = OutputPortDef(name=\"out_prod\", default_datastore_name=datastore.name, default_datastore_mode=\"mount\",\n",
        "                             label=\"Product of two numbers\")\n",
        "module.publish_python_script(\n",
        "    \"calculate.py\", \"middle\", inputs=[in1_mid, in2_mid], outputs=[out_sum_mid, out_prod_mid], version=\"2\", is_default=True, \n",
        "    source_directory=\"./calc\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Using a Module in a Pipeline with ModuleStep"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Introduction"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Using a Module, and more precisely, a specific version, in a pipeline is done via a specialized kind of step. This step is called ModuleStep. It is used as a step in a pipeline, one that holds enough information that allows pinpointing to a specific ModuleVersion. \n",
        "\n",
        "Another responsibility of a ModuleStep is to wire the actual data that is used in the pipeline to the inputs/outputs definitions of the ModuleVersion. This wiring is done by mapping each of the inputs and the outputs definitions to a data element in the pipeline. Defining the wiring is done using a dictionary whose keys are the name of the inputs/outputs, and the mapped value is the data element (e.g., a PipelineData object)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Deciding which ModuleVersion to use - resolving"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "It is up to the ModuleStep to decide which ModuleVersion to use. That decision is based on the parameters given to the ModuleStep, and it follows this process:\n",
        "1. If a ModuleVersion object was provided, use it.\n",
        "2. For the given Module object, if a version was provided, use it.\n",
        "3. The given Module object resolves which is the right version:\n",
        "  1. If a default ModuleVersion was defined for the Module, use it.\n",
        "  2. If all the versions of the ModuleVersions in the Module follow semantic versioning, take the one with the highest version.\n",
        "  3. Take the ModuleVersion with the most recent update."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### First Step and its wires"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<p>The first step in a pipeline does not have incoming inputs, but it does have outputs. For that we'd use the ModuleVersion that was designed for this use case.</p>\n",
        "We start off by preparing the outgoing edges as two PipelineData objects (to be later linked to another step), as well as wiring these to the moduleVersion's outputs by creating a dictionary mapping."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "first_sum = PipelineData(\"sum_out\", datastore=datastore, output_mode=\"mount\",is_directory=False)\n",
        "first_prod = PipelineData(\"prod_out\", datastore=datastore, output_mode=\"mount\",is_directory=False)\n",
        "step_output_wiring = {\"out_sum\":first_sum, \"out_prod\":first_prod}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Initial ModuleStep"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<p>In order for the step to know which ModuleVersion to use, we provided the initial ModuleVersion object. We wire the ModuleVersion's outputs with the <i> step_output_wiring</i> map we just created. </p>\n",
        "The initial ModuleStep uses the ModuleVersion that does not have inputs from the pipeline, however, it still needs to receive two numbers to operate upon. We'll provide these numbers as arguments to the step. The first is provided as a parameter, the other one is hard coded."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "first_num_param = PipelineParameter(name=\"initialNum\", default_value=17)\n",
        "first_step = ModuleStep(module_version=entry_version,\n",
        "                 inputs_map={}, outputs_map=step_output_wiring, \n",
        "                 runconfig=RunConfiguration(), \n",
        "                 compute_target=aml_compute, \n",
        "                 arguments = [\"--output_sum\", first_sum, \n",
        "                              \"--output_product\", first_prod,\n",
        "                              \"--arg_num1\", first_num_param, \n",
        "                              \"--arg_num2\", \"2\"])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Second step and its wires"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The second step in the pipeline receives its inputs from the previous step, and emits its outputs to the next step. Thus the ModuleStep here needs a different kind of ModuleVersion, one that has both inputs and outputs defined for. We have defined such ModuleVersion, and moreover, defined it to be the default version of our Module. This allows us to provide to the ModuleStep the Module object, which would resolve to that default ModuleVersion when needed."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Wires"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The wiring to the previous step relies on the PipelineData objects we defined before, and for them we create a new dictionary mapping to the ModuleVersion. The wiring to the next step requires us to define another pair of PipelineData objects, for which also a dictionary mapping is needed."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "modulestep-remarks-sample2"
        ]
      },
      "outputs": [],
      "source": [
        "middle_step_input_wiring = {\"in1\":first_sum, \"in2\":first_prod}\n",
        "middle_sum = PipelineData(\"middle_sum\", datastore=datastore, output_mode=\"mount\",is_directory=False)\n",
        "middle_prod = PipelineData(\"middle_prod\", datastore=datastore, output_mode=\"mount\",is_directory=False)\n",
        "middle_step_output_wiring = {\"out_sum\":middle_sum, \"out_prod\":middle_prod}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Middle ModuleStep - resolving to the default ModuleVersion"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "modulestep-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "middle_step = ModuleStep(module=module,\n",
        "                         inputs_map= middle_step_input_wiring, \n",
        "                         outputs_map= middle_step_output_wiring,\n",
        "                         runconfig=RunConfiguration(), compute_target=aml_compute,\n",
        "                         arguments = [\"--file_num1\", first_sum, \"--file_num2\", first_prod,\n",
        "                                      \"--output_sum\", middle_sum, \"--output_product\", middle_prod])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## End step and its wires"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The last step in the pipeline also has input and outputs, thus its configuration would be similar to the previous step. In this case we would still use Pipeline data as the step's outputs, even though they are not read by any following step, but rather act as the end result of the pipeline."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Wires"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "last_step_input_wiring = {\"in1\":middle_sum, \"in2\":middle_prod}\n",
        "end_sum = PipelineData(\"end_sum\", datastore=datastore, output_mode=\"mount\",is_directory=False)\n",
        "end_prod = PipelineData(\"end_prod\", datastore=datastore, output_mode=\"mount\",is_directory=False)\n",
        "last_step_output_wiring = {\"out_sum\":end_sum, \"out_prod\":end_prod}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Last ModuleStep - specifing the exact version"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "end_step = ModuleStep(module=module, version=\"2\",\n",
        "                      inputs_map= last_step_input_wiring,\n",
        "                      outputs_map= last_step_output_wiring,\n",
        "                      runconfig=RunConfiguration(), compute_target=aml_compute,\n",
        "                      arguments=[\"--file_num1\", middle_sum, \"--file_num2\", middle_prod,\n",
        "                                 \"--output_sum\", end_sum, \"--output_product\", end_prod])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Pipeline, experiment, submission"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The last thing to be done is to create a pipeline out of the previously defined steps, then create an experiment and submit the pipeline to the experiment."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = Pipeline(workspace=workspace, steps=[first_step, middle_step, end_step])"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "experiment = Experiment(workspace, 'testmodulestesp')\n",
        "experiment.submit(pipeline)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": []
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to use ModuleStep with AML Pipelines",
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 14,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of ModuleStep"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}